package com.tbit.uqbike.tergateway.data;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tbit.uqbike.TerGatewayMain;
import com.tbit.uqbike.service.db.DbService;
import com.tbit.uqbike.service.redis.RedisService;
import com.tbit.uqbike.task.DelayOfflineOrder;
import com.tbit.uqbike.task.OfflineTerCheck;
import com.tbit.uqbike.task.OrderTimeOutTask;
import com.tbit.uqbike.task.RedisBatReader;
import com.tbit.uqbike.task.RedisBatWriter;
import com.tbit.uqbike.task.RedisBatWriterSec;
import com.tbit.uqbike.task.StatDataTask;
import com.tbit.uqbike.task.TerOfflineCheckTask;
import com.tbit.uqbike.tergateway.config.TerGatewayConfig;
import com.tbit.uqbike.tergateway.entity.AConnInfo;
import com.tbit.uqbike.tergateway.entity.ConnInfo;
import com.tbit.uqbike.tergateway.entity.MqttConnInfo;
import com.tbit.uqbike.tergateway.entity.PlatformPointData;
import com.tbit.uqbike.tergateway.entity.ProtocolData;
import com.tbit.uqbike.tergateway.entity.RemoteControl;
import com.tbit.uqbike.tergateway.entity.TerHisAlarm;
import com.tbit.uqbike.tergateway.entity.TerHisMsg;
import com.tbit.uqbike.tergateway.entity.TerHisPos;
import com.tbit.uqbike.tergateway.entity.TerSoftInfo;
import com.tbit.uqbike.tergateway.entity.TerTempData;
import com.tbit.uqbike.tergateway.log.LOG;
import com.tbit.uqbike.tergateway.pojo.PlatformPoint;
import com.tbit.uqbike.tergateway.pojo.TerAlarm;
import com.tbit.uqbike.tergateway.pojo.TerBattery;
import com.tbit.uqbike.tergateway.pojo.TerMsg;
import com.tbit.uqbike.tergateway.pojo.TerOnline;
import com.tbit.uqbike.tergateway.pojo.TerOnlineHis;
import com.tbit.uqbike.tergateway.pojo.TerPos;
import com.tbit.uqbike.tergateway.pojo.TersoftwareKey;
import com.tbit.uqbike.util.BatHandle;
import com.tbit.uqbike.util.CollExUtils;
import com.tbit.uqbike.util.ConstDefine;
import com.tbit.uqbike.util.StringUtil;
import com.tbit.uqbike.util.TerPubUtil;
import com.tbit.uqbike.util.TurnTidToHashId;
import com.tbit.utils.PlatformPointConst;
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import redis.clients.util.JedisByteHashMap;

import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * Created by MyWin on 2017/5/5.
 * 运行时内存数据
 */
public class TerGatewayData {
    public static final String BatInsertThreadGroupName = "BatInsert";
    public static final TurnTidToHashId turnTidToHashKey = new TurnTidToHashId();

    static {
        batInsertThreadGroup = new ThreadGroup(BatInsertThreadGroupName);
        hisPosBatInsert = new BatHandle();
        hisAlarmBatInsert = new BatHandle();
        terMsgBatInsert = new BatHandle();
    }

    private static ThreadGroup batInsertThreadGroup;
    /**
     * 后面缓存阻塞队列 + 定时任务的方式
     */
    private static BatHandle hisPosBatInsert;
    private static BatHandle hisAlarmBatInsert;
    private static BatHandle terMsgBatInsert;
    private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TerGatewayData.class);

    /**
     * 连接id和连接对象的映射关系
     */
    private static ConcurrentHashMap<String, AConnInfo> connIdToConn = new ConcurrentHashMap<String, AConnInfo>();
    private static ReentrantLock mqttConnSyncObj = new ReentrantLock();
    /**
     * 设备编号和临时缓存对应关系
     */
    private static HashMap<String, TerTempData> mnoToTempData = new HashMap<String, TerTempData>();
    private static ReentrantReadWriteLock mnoToTempDataLock = new ReentrantReadWriteLock();
    /**
     * 连接本网关的tid的set集合
     * 用于实时更新routekey
     */
    private static HashSet<String> connMnoSet = new HashSet<>();
    private static ReentrantLock connMnoSetLock = new ReentrantLock();

    /**
     * 流水号字典
     */
    private static ConcurrentHashMap<String, Object> strSerNoMap = new ConcurrentHashMap<String, Object>();
    /**
     * 固件缓存字典
     */
    private static ConcurrentHashMap<String, byte[]> gJDataMap = new ConcurrentHashMap<String, byte[]>();

    public static DelayOfflineOrder delayOfflineOrder = new DelayOfflineOrder();

    public static StatDataTask statDataTask = new StatDataTask();
    /**
     * 定时任务管理器
     */
    private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(TerGatewayConfig.OrderTimeOutCheckThreadPool);

    // 检测流水号是否存在
    public static boolean serNoCheck(String str) {
        return strSerNoMap.containsKey(str);
    }

    public static void setStrSerNoMap(String str, Object obj) {
        strSerNoMap.put(str, obj);
    }

    public static Object getStrSerNoMap(String str) {
        Object obj = strSerNoMap.remove(str);
        return obj;
    }

    public static void addOrderTimeOutCheck(RemoteControl remoteControl) {
        OrderTimeOutTask orderTimeOutTask = new OrderTimeOutTask(remoteControl);
        executor.schedule(orderTimeOutTask, TerGatewayConfig.OrderTimeOutMin, TimeUnit.MINUTES);
    }

    public static void startBatWriteTask() {
        executor.scheduleWithFixedDelay(new RedisBatWriter(), TerGatewayConfig.iFirstRunCacheBatWriteWaitSec, TerGatewayConfig.iCacheBatWriteInvSec, TimeUnit.SECONDS);
        executor.scheduleWithFixedDelay(new RedisBatReader(), TerGatewayConfig.iFirstRunCacheBatWriteWaitSec, TerGatewayConfig.iCacheBatReadInvSec, TimeUnit.SECONDS);
        executor.scheduleWithFixedDelay(new OfflineTerCheck(), TerGatewayConfig.iOfflineTerCheckIvnMin, TerGatewayConfig.iOfflineTerCheckIvnMin, TimeUnit.MINUTES);
        // 10s后执行，以后每1秒写入一次
        executor.scheduleWithFixedDelay(new RedisBatWriterSec(), 10, 1, TimeUnit.SECONDS);
        // 10s后执行，以后每500毫秒检测一次
        executor.scheduleWithFixedDelay(delayOfflineOrder, 10, 500, TimeUnit.MILLISECONDS);
        //
        executor.scheduleWithFixedDelay(statDataTask, 10, 500, TimeUnit.MILLISECONDS);
        // 90S后执行，以后每隔180s执行检测一次
        executor.scheduleWithFixedDelay(new TerOfflineCheckTask(), 90, 180, TimeUnit.SECONDS);
    }

    public static void putConnTidSet(String tid) {
        try {
            connMnoSetLock.lock();
            connMnoSet.add(tid);
        } finally {
            connMnoSetLock.unlock();
        }
    }

    public static HashSet<String> replaceConnTidSet() {
        HashSet<String> ret = null;
        HashSet<String> newset = new HashSet<>();
        try {
            connMnoSetLock.lock();
            ret = connMnoSet;
            connMnoSet = newset;
        } finally {
            connMnoSetLock.unlock();
        }
        return ret;
    }

    //region 连接和映射信息
    public static void AddConnect(ChannelHandlerContext ctx) {
        if (LOG.bConnMnoLog) {
            LOG.NET.info(String.format("add conn:%s", ctx.channel().id().toString()));
        }
        connIdToConn.put(ctx.channel().id().toString(), new ConnInfo(ctx));
    }

    public static void AddConnect(AConnInfo conn) {
        connIdToConn.put(conn.connId, conn);
    }

    public static void updateVerAndRev(AConnInfo info, byte version, int reserve) {
        if (null != info) {
            info.version = version;
            info.reserve = reserve;
        }
    }

    public static void DelConnect(String connId) {
        if (LOG.bConnMnoLog) {
            LOG.NET.info(String.format("del conn:%s", connId));
        }
        connIdToConn.remove(connId);
    }

    public static AConnInfo GetConnect(String connId) {
        return connIdToConn.get(connId);
    }

    public static AConnInfo CheckMqttConnect(String connId) {
        AConnInfo connInfo = connIdToConn.get(connId);
        if (connInfo == null) {
            try {
                mqttConnSyncObj.lock();

                connInfo = connIdToConn.get(connId);
                if (connInfo == null) {
                    connInfo = new MqttConnInfo();
                    connInfo.connId = connId;
                    connIdToConn.put(connId, connInfo);
                }
            } finally {
                mqttConnSyncObj.unlock();
            }
        }
        return connInfo;
    }

    public static String getMnoByConnId(String connId) {
        String mno = StringUtil.Empty;
        AConnInfo connInfo = connIdToConn.get(connId);
        if (null != connInfo) {
            mno = connInfo.mno;
        }
        return mno;
    }

    /**
     * 获取，不存在就创建
     *
     * @param mno
     * @return
     */
    public static TerTempData getTerTempDataByMno(String mno) {
        TerTempData terTempData = null;
        try {
            mnoToTempDataLock.readLock().lock();
            terTempData = mnoToTempData.get(mno);
        } finally {
            mnoToTempDataLock.readLock().unlock();
        }
        if (terTempData == null) {
            boolean bAdd = false;//标识是否需要初始化
            try {
                mnoToTempDataLock.writeLock().lock();
                terTempData = mnoToTempData.get(mno);
                if (null == terTempData) {
                    terTempData = new TerTempData(mno);
                    mnoToTempData.put(mno, terTempData);
                    bAdd = true;
                }
            } finally {
                mnoToTempDataLock.writeLock().unlock();
            }
            if (bAdd) {
                terTempData.refreshMemData();
            }
        }
        return terTempData;
    }

    /**
     * 尝试获取不创建
     *
     * @param mno
     * @return
     */
    public static TerTempData tryGetTerTempDataByMno(String mno) {
        TerTempData terTempData = null;
        try {
            mnoToTempDataLock.readLock().lock();
            terTempData = mnoToTempData.get(mno);
        } finally {
            mnoToTempDataLock.readLock().unlock();
        }
        return terTempData;
    }

    public static List<TerTempData> getAllTerTempData() {
        List<TerTempData> mnoList = new LinkedList<>();
        try {
            mnoToTempDataLock.writeLock().lock();
            Iterator iter = mnoToTempData.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry) iter.next();
                mnoList.add((TerTempData) entry.getValue());
            }
        } finally {
            mnoToTempDataLock.writeLock().unlock();
        }
        return mnoList;
    }

    public static List<String> getAllTerTid() {
        List<String> mnoList = new LinkedList<>();
        try {
            mnoToTempDataLock.writeLock().lock();
            Iterator iter = mnoToTempData.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry) iter.next();
                mnoList.add((String) entry.getKey());
            }
        } finally {
            mnoToTempDataLock.writeLock().unlock();
        }
        return mnoList;
    }

    public static void removeTerMnos(List<String> tids) {
        try {
            mnoToTempDataLock.writeLock().lock();
            for (String key : tids) {
                mnoToTempData.remove(key);
            }
        } finally {
            mnoToTempDataLock.writeLock().unlock();
        }
    }

    /**
     * 展示网关当前状态
     */
    public static void showTerGatewayData() {
        int connSize,terSize;
        connSize = connIdToConn.size();
        terSize = mnoToTempData.size();
        try {
            String str = String.format("Curr Connnect:%d Ter Cache:%d", connSize, terSize);
            logger.info(str);
            // 打印出统计信息
            long directAllCnt = 0, directDeaCnt = 0;
            List<PoolArenaMetric> metrics = PooledByteBufAllocator.DEFAULT.directArenas();
            for (PoolArenaMetric metric : metrics) {
                directAllCnt += metric.numAllocations();
                directDeaCnt += metric.numDeallocations();
            }
            logger.info(String.format("Dir Pool Mem Total:%d Total Free:%d", directAllCnt, directDeaCnt));
            // 打印日志
            RedisService redis = TerGatewayMain.getRedis();
            logger.info("Redis Stat:" + redis.getRunCnt());
        } catch (Exception e) {
        }
        TerGatewayData.statDataTask.addStatData(TerGatewayConfig.pointDataStatKey, getPlamformPointDataMsg(connSize, terSize));
    }

    public static String getPlamformPointDataMsg(int connSize,int terSize) {
        PlatformPointData data = new PlatformPointData();
        data.ident = TerGatewayConfig.moduleName;
        data.datas = new LinkedList<>();

        PlatformPoint pp = new PlatformPoint();
        pp.setDataType(PlatformPointConst.CONN_CNT);
        pp.setIdent(data.ident);
        pp.setDt(new Date());
        pp.setDataValue(Integer.toString(connSize));
        data.datas.add(pp);

        pp = new PlatformPoint();
        pp.setDataType(PlatformPointConst.TER_CNT);
        pp.setIdent(data.ident);
        pp.setDt(new Date());
        pp.setDataValue(Integer.toString(terSize));
        data.datas.add(pp);

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("msgId", ConstDefine.MQ_MSG_ID_PLATFORM_PP);
        jsonObject.put("feedback", StringUtil.Empty);
        jsonObject.put("data", data);
        return jsonObject.toJSONString();
    }

    //endregion

    //region 获取数据接口 先缓存获取 再数据库
    public static final String REIDS_ORDER_LIST_TER = "order";
    public static final String hashIdHead = "uq";
    public static final String terLastPos = "tLP";
    public static final String terLastBattery = "tLB";
    public static final String terMqRouteKey = "tRK";
    public static final String terSyncTime = "tST";
    public static final String terLastStatusKey = "tLS";
    public static final int DefaultSyncTime = 0;
    public static final String terSyncValue = "tSV";
    public static final String terLastPkgDt = "tLT";
    public static final String terOnlineFlag = "tOF";// 终端在线标志
    public static final String terNewSoftwave = "tNS";
    public static final String terProtoclData = "tPD";
    public static final String terLastVol = "tLV";

    public static final String terStopTime = "tLST";// 终端静止起止时间
    public static final String terLastStat = "tLSS";// 终端最新状态

    /**
     * business get last pos
     *
     * @param mno
     * @return
     */
    public static TerPos getTerLastPosByMno(String mno) {
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        String value = redis.get(hashId, terLastPos);
        if (StringUtil.IsNullOrEmpty(value)) {
            DbService dbService = TerGatewayMain.getDbService();
            TerPos terPos = dbService.selectLastPosInfo(mno);
            if (null != terPos) {
                redis.set(hashId, terLastPos, JSON.toJSONString(terPos));
            }
            return terPos;
        } else {
            return JSON.parseObject(value, TerPos.class);
        }
    }

    public static TerBattery getTerBatteryByMno(String mno) {
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        String value = redis.get(hashId, terLastBattery);
        if (StringUtil.IsNullOrEmpty(value)) {
            DbService dbService = TerGatewayMain.getDbService();
            TerBattery terBattery = dbService.selectLastTerBattery(mno);
            if (null != terBattery) {
                redis.set(hashId, terLastBattery, JSON.toJSONString(terBattery));
            }
            return terBattery;
        } else {
            return JSON.parseObject(value, TerBattery.class);
        }
    }

    public static int getTerSyncTimeByMno(String mno) {
        int serTime = DefaultSyncTime;
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        String value = redis.get(hashId, terSyncTime);
        if (!StringUtil.IsNullOrEmpty(value)) {
            serTime = Integer.parseInt(value);
        }
        return serTime;
    }

    public static String getTerSyncValueByMno(String mno) {
        String syncValue = StringUtil.Empty;
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        String value = redis.get(hashId, terSyncValue);
        if (!StringUtil.IsNullOrEmpty(value)) {
            syncValue = value;
        }
        return syncValue;
    }

    public static String getTerHashId(String machineNO) {
        return String.format("%s.%s", hashIdHead, machineNO);
    }

    /**
     * 获取终端升级配置
     *
     * @param mno
     * @return
     */
    public static TerSoftInfo getTersoftwareKeyByMno(String mno) {
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        String value = redis.get(hashId, terNewSoftwave);
        if (!StringUtil.IsNullOrEmpty(value)) {
            logger.info(String.format("Get Ter SoftInfo:%s", value));
            return JSON.parseObject(value, TerSoftInfo.class);
        } else {
            return null;
        }
    }

    public static void setTersoftwareKeyEmpty(String mno) {
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        redis.set(hashId, terNewSoftwave, StringUtil.Empty);
    }

    //endregion

    //region 延迟去重更新缓存

    /**
     *
     */
    public static void clearData() {

    }

    public static void startDataService() {
        Thread thread = new Thread(batInsertThreadGroup, hisPosBatInsert);
        thread.start();
        thread = new Thread(batInsertThreadGroup, hisAlarmBatInsert);
        thread.start();
        thread = new Thread(batInsertThreadGroup, terMsgBatInsert);
        thread.start();
        // 初始化
        startInitCacheData();
        startBatWriteTask();
    }

    public static void startInitCacheData() {
        try {
            Date start = new Date();
            RedisService redis = TerGatewayMain.getRedis();
            Set<String> tidSet = redis.Set_GetAllItem(String.format("%s.tidset", TerGatewayConfig.gatewayNameIdent));
            // 拆分key
            List<List<String>> listList = CollExUtils.spiltSet(tidSet, TerGatewayConfig.iRedisBatSize, null);
            int cacheCnt = 0;
            if (!listList.isEmpty()) {
                for (List<String> list : listList) {
                    List<Object> ret = redis.Hash_BatGetAll(CollExUtils.turnList(list, turnTidToHashKey));
                    // 挨个处理
                    Iterator<String> keyIterator = list.iterator();
                    Iterator<Object> valueIterator = ret.iterator();
                    while (keyIterator.hasNext() && valueIterator.hasNext()) {
                        cacheCnt++;
                        String tid = keyIterator.next();
                        Map<String, String> map = redis.Hash_FormatResult((JedisByteHashMap) valueIterator.next());
                        TerTempData terTempData = TerGatewayData.getTerTempDataByMno(tid);
                        terTempData.refreshMemData(map);
                    }
                }
            }
            Date end = new Date();
            long lMs = end.getTime() - start.getTime();
            logger.info(String.format("Bat Init TerCache [%d],Use:%d", cacheCnt, lMs));
        } catch (Exception e) {
            logger.error("初始化缓存异常", e);
        }
    }

    //endregion

    //region 历史信息批量插入

    /**
     * @param terPos
     */
    public static void addTerHisPos(TerPos terPos) {
        TerHisPos terHisPos = new TerHisPos(terPos);
        hisPosBatInsert.addHandleItem(terHisPos);
    }

    /**
     * @param terAlarm
     */
    public static void addTerHisAlarm(TerAlarm terAlarm) {
        TerHisAlarm terHisAlarm = new TerHisAlarm(terAlarm);
        hisAlarmBatInsert.addHandleItem(terHisAlarm);
    }

    public static void addTerMsg(TerMsg terMsg) {
        TerHisMsg terHisMsg = new TerHisMsg(terMsg);
        terMsgBatInsert.addHandleItem(terHisMsg);
    }

    //endregion

    //region Redis Op
    public static void updateTerMqBinding(String mno, String routeKey) {
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        redis.set(hashId, terMqRouteKey, routeKey);
        redis.persist(hashId);
    }

    //更新版本号和保留字
    public static void updateTerVerReve(String mno, byte version, int reserve) {
        ProtocolData protocolData = new ProtocolData(version, reserve);
        RedisService redis = TerGatewayMain.getRedis();
        String hashId = getTerHashId(mno);
        redis.set(hashId, terProtoclData, JSONObject.toJSONString(protocolData));
    }
    //endregion

    //region getGjData

    /**
     * 获取数据块
     *
     * @param cusCode  客户编码
     * @param hardCode 硬件编码
     * @param ver      版本号
     * @param type     固件类型
     * @param block    块机制
     * @param index    块序号
     * @return
     */
    public static byte[] getGjData(int cusCode, int hardCode, int ver, int type, int block, int index) {
        byte[] data = null;
        String key = getGjCacheKey(cusCode, hardCode, ver, type);
        byte[] all = gJDataMap.get(key);
        if (null == all) {
            all = TerGatewayMain.getDbService().getGjData(new TersoftwareKey(cusCode, hardCode, ver, type));
        }
        if (all != null) {
            int signMaxSize = TerPubUtil.turnBlockIdToSize(block);
            data = TerPubUtil.getWA206BlockData(all, signMaxSize, index);
        } else {
            logger.info(String.format("get cusCode:%d,hardCode:%d,ver:%d,type:%d firmware null", cusCode, hardCode, ver, type));
        }
        return data;
    }


    public static String getGjCacheKey(int cusCode, int hardCode, int ver, int type) {
        return String.format("%d.%d.%d.%d", cusCode, hardCode, ver, type);
    }
    //endregion

    //region 在线离线相关
    private static volatile HashMap<String, TerOnline> lastOnlineMap = new HashMap<>();
    private static ReentrantLock lastOnlineMapSyncObj = new ReentrantLock();

    private static volatile List<TerOnlineHis> hisOnlineList = new LinkedList<>();
    private static ReentrantLock hisOnlineListSyncObj = new ReentrantLock();

    public static void updateOnlineItem(TerOnline item) {
        try {
            lastOnlineMapSyncObj.lock();

            lastOnlineMap.put(item.getMno(), item);
        } finally {
            lastOnlineMapSyncObj.unlock();
        }
    }

    public static HashMap<String, TerOnline> getAllUpdateOnlineItems() {
        HashMap<String, TerOnline> temp = null;
        try {
            lastOnlineMapSyncObj.lock();

            temp = lastOnlineMap;
            lastOnlineMap = new HashMap<>();
        } finally {
            lastOnlineMapSyncObj.unlock();
        }
        return temp;
    }

    public static void insertOnlineItems(List<TerOnlineHis> items) {
        try {
            hisOnlineListSyncObj.lock();

            for (TerOnlineHis item : items) {
                hisOnlineList.add(item);
            }
        } finally {
            hisOnlineListSyncObj.unlock();
        }
    }

    public static List<TerOnlineHis> getAllInsertOnlineItems() {
        List<TerOnlineHis> temp = null;
        try {
            hisOnlineListSyncObj.lock();

            temp = hisOnlineList;
            hisOnlineList = new LinkedList<>();
        } finally {
            hisOnlineListSyncObj.unlock();
        }
        return temp;
    }
    //endregion
}
